package com.ideal.hadoopadmin.api.hdfs;

import com.ideal.hadoopadmin.api.linux.UserAPI;
import com.ideal.hadoopadmin.crontab.hdfs.FlushHDFSInfo;
import com.ideal.hadoopadmin.crontab.hive.FlushHiveInfo;
import com.ideal.hadoopadmin.crontab.property.Properties;
import com.ideal.service.hdfs.HDFSService;
import com.ideal.tools.ssh.common.CommonProperties;
import com.ideal.tools.ssh.common.CommonTools;
import com.ideal.tools.ssh.common.OperationMarket;
import com.ideal.tools.ssh.context.ClusterContext;
import com.ideal.tools.ssh.entity.LinuxMachine;
import com.ideal.tools.ssh.operation.LinuxOperation;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by CC on 2016/3/14.
 */
public class HDFSAPI {

    public static String HDFS_QUOTA_SPACE_SIZE = "HDFS_QUOTA_SPACE_SIZE";
    public static String HDFS_QUOTA_DIR_NUMBER = "HDFS_QUOTA_DIR_NUMBER";
    public static String HDFS_PUB_TABLE_DIR = "HDFS_PUB_TABLE_DIR";
    public static String HDFS_HAS_OTHER_ACCESS="HDFS_HAS_OTHER_ACCESS";
    public static String HDFS_TRUE ="TRUE";
    public static String HDFS_FALSE="FALSE";

    /**
     * machineList 只需要传递 namenode 节点
     * Cluster_User_Name 用户名
     * HDFS_QUOTA_SPACE_SIZE 配额空间
     * HDFS_QUOTA_DIR_NUMBER 文件夹的多少
     * @param context
     */
    public static void QuotaHDFS(ClusterContext context){
        List<LinuxMachine> machineList = context.getOriginalList();
        CommonProperties commonProperties =context.getCommonProperties();

        String userName = commonProperties.getArgument(UserAPI.Cluster_User_Name,"");
        String spaceSize = commonProperties.getArgument(HDFS_QUOTA_SPACE_SIZE,"");
        String dirNumber = commonProperties.getArgument(HDFS_QUOTA_DIR_NUMBER,"");

        if(StringUtils.isBlank(userName)){
            return ;
        }
        //不能同时为空 必须要有一个
        if(StringUtils.isBlank(spaceSize) && StringUtils.isBlank(dirNumber)){
            return ;
        }

        //webapp 机器是脚本机器 放置执行脚本 所以是执行机器
        List<LinuxMachine> finalMachines = CommonTools.getMachineListByType(machineList,context,
                LinuxMachine.MachineType.NameNode);
        for(LinuxMachine machine: finalMachines){
            machine.initOperation(OperationMarket.QuotaHDFS(userName, spaceSize, dirNumber),true);
        }

        //重新设置 机器列表
        context.setMachineList(finalMachines);
        //执行
        context.doTheThing();
    }


    /**
     * 根据给定的用户创建 目录
     * machineList 只需要放入 nameNODE
     * Cluster_User_Name 用户名
     * HDFS_PUB_TABLE_DIR 需要创建的 表名称 （不能包含/）
     * @param context
     */
    public static void MakeHDFSPubDir(ClusterContext context){
        List<LinuxMachine> machineList = context.getOriginalList();
        CommonProperties commonProperties =context.getCommonProperties();

        String userName = commonProperties.getArgument(UserAPI.Cluster_User_Name,"");
        String tblDIR = commonProperties.getArgument(HDFSAPI.HDFS_PUB_TABLE_DIR,"");

        String dir = CommonTools.getHDFSPublicTblDir(userName, tblDIR, commonProperties);
        String owner = userName;
        String group = CommonTools.getHDFSPubDirGroup(userName, tblDIR);
        String mode = "750";
        String db_role  = CommonTools.getHiveRole("tb","select",owner+"_"+tblDIR,context.getCommonProperties());

        if(StringUtils.isBlank(userName)){
            return ;
        }
        //合并
        List<LinuxMachine> finallist=CommonTools.megerMachineOperation(machineList, context);
        for(LinuxMachine machine: finallist){
            for (LinuxMachine.MachineType machineType : machine.getMachineRoleTypes()) {
                if (machineType == LinuxMachine.MachineType.NameNode) {
                    //首先添加 hdfs 的目录
                    machine.initOperation(OperationMarket.MakeHDFSDir(dir, owner, group, mode), true);
                    //添加 用户组
                    machine.initOperation(OperationMarket.AddLinuxGroup(group));
                } else if (machineType == LinuxMachine.MachineType.Hive) {
                    machine.initOperation(OperationMarket.CreateSentryRole(db_role));
                }
            }
        }

        //重新设置 机器列表
        context.setMachineList(finallist);
        //执行
        context.doTheThing();

    }

    /**
     * machineList机器列表 放入 namenode
     * Cluster_User_Name 用户名
     * HDFS_PUB_TABLE_DIR  表名称 （不能包含/）
     * @param context
     */
    public static void RMHDFSPubDir(ClusterContext context){
        List<LinuxMachine> machineList = context.getOriginalList();
        CommonProperties commonProperties =context.getCommonProperties();

        String userName = commonProperties.getArgument(UserAPI.Cluster_User_Name,"");
        String tblDIR = commonProperties.getArgument(HDFSAPI.HDFS_PUB_TABLE_DIR,"");

        String dir = CommonTools.getHDFSPublicTblDir(userName, tblDIR, commonProperties);
        String group = CommonTools.getHDFSPubDirGroup(userName, tblDIR);

        if(StringUtils.isBlank(userName)){
            return ;
        }
        List<LinuxMachine> finallist=CommonTools.megerMachineOperation(machineList, context);

        for(LinuxMachine machine: finallist){
            for (LinuxMachine.MachineType machineType : machine.getMachineRoleTypes()) {
                if (machineType == LinuxMachine.MachineType.NameNode) {
                    //删除用户共有目录
                    machine.initOperation(OperationMarket.RMHDFSDir(dir),true);
                    //删除用户组
                    machine.initOperation(OperationMarket.DelLinuxGroup(group));
                } else if (machineType == LinuxMachine.MachineType.Hive) {

                }
            }
        }

        //重新设置 机器列表
        context.setMachineList(finallist);
        //执行
        context.doTheThing();

    }

    /**
     * UserAPI.Cluster_User_Name 用户
     * HDFSAPI.HDFS_PUB_TABLE_DIR 不能包含 /
     * machineList 放入 namenode
     * @param context
     */
    public static void accessUserToGroup(ClusterContext context){
        List<LinuxMachine> machineList = context.getOriginalList();
        CommonProperties commonProperties =context.getCommonProperties();

        String tblDIR = commonProperties.getArgument(HDFSAPI.HDFS_PUB_TABLE_DIR,"");
        //被赋权用户
        String userName = commonProperties.getArgument(UserAPI.Cluster_User_Name,"");
        //目录所属用户
        String owner =  commonProperties.getArgument(UserAPI.Cluster_HDFS_PATH_OWNER,"");


        String userGroup = CommonTools.getUserGroupByUser(owner,commonProperties);
        String tblGroup = CommonTools.getHDFSPubDirGroup(owner,tblDIR);

        //用户需要加入两个组 首先是 user_group 这样才能访问父目录 ，tableGroup 访问本目录
        String groups=userGroup+","+tblGroup;
        List<LinuxMachine> finalmachines = CommonTools.megerMachineOperation(machineList,context);
        for(LinuxMachine machine: finalmachines){
            for (LinuxMachine.MachineType machineType : machine.getMachineRoleTypes()) {
                if (machineType == LinuxMachine.MachineType.NameNode) {
                    //用户加入组
                    machine.initOperation(OperationMarket.JoinUserToGroup(userName, groups));
                }
            }
        }

        //重新设置 机器列表
        context.setMachineList(finalmachines);
        //执行
        context.doTheThing();
    }


    /**
     * UserAPI.Cluster_User_Name 用户
     * HDFSAPI.HDFS_PUB_TABLE_DIR 不能包含 /
     * machineList 放入 namenode
     * @param context
     */
    public static void refuseUserToGroup(ClusterContext context){
        List<LinuxMachine> machineList = context.getOriginalList();
        CommonProperties commonProperties =context.getCommonProperties();

        String tblDIR = commonProperties.getArgument(HDFSAPI.HDFS_PUB_TABLE_DIR,"");
        String userName = commonProperties.getArgument(UserAPI.Cluster_User_Name,"");
        String hdfs_path_owner = commonProperties.getArgument(UserAPI.Cluster_HDFS_PATH_OWNER,"");
        String hasOtherAccess = commonProperties.getArgument(HDFSAPI.HDFS_HAS_OTHER_ACCESS,HDFSAPI.HDFS_TRUE);

        String userGroup = CommonTools.getUserGroupByUser(hdfs_path_owner, commonProperties);
        String tblGroup = CommonTools.getHDFSPubDirGroup(hdfs_path_owner,tblDIR);

        //用户需要加入两个组 首先是 user_group 这样才能访问父目录 ，tableGroup 访问本目录
        List<LinuxMachine> finalmachines = CommonTools.megerMachineOperation(machineList,context);
        for(LinuxMachine machine: finalmachines){
            for (LinuxMachine.MachineType machineType : machine.getMachineRoleTypes()) {
                if (machineType == LinuxMachine.MachineType.NameNode) {
                    //用户移除小组
                    machine.initOperation(OperationMarket.RemoveUserFromGroup(userName, tblGroup));
                    //当不包含其他的时候 我们需要把 上级组也删除掉
                    if(hasOtherAccess.equals(HDFSAPI.HDFS_FALSE)){
                        machine.initOperation(OperationMarket.RemoveUserFromGroup(userName, userGroup));
                    }
                }
            }
        }

        //重新设置 机器列表
        context.setMachineList(finalmachines);
        //执行
        context.doTheThing();
    }

    /**
     * 首页  元数据管理  hdfs 刷新列表
     *刷新meta_hdfs_info_bak
     * @param context
     */
    public  static void callFlushHdfsInfoBak(ClusterContext context){
        new FlushHDFSInfo().HDFSDailyRefreshNew(context);
    }

    /**
     * HDFS刷新（新方法）
     * @param context
     */
    public static void flushHdfsInfoBak(ClusterContext context){
        new HDFSService().refreshHDFS(context);
    }
}
